package day02.source;

import beans.SensorReading;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;

/**
 * Flink 流处理 API - source
 *
 * @author lvbingbing
 * @date 2021-09-04 17:04
 */
public class FlinkSource {

    private static StreamExecutionEnvironment env;

    public static void main(String[] args) throws Exception {
        // 创建执行环境
        env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 1、从集合中读取数据
        readFromCollection();
        // 2、从文件中读取数据
        readFromFile();
        // 3、从kafka数据源读取数据
        readFromKafka();
        // 4、自定义source
        readFromCustomizeSource();
        // 触发程序执行
        env.execute();
    }

    /**
     * 从集合中读取数据
     */
    private static void readFromCollection() {
        List<String> list = Arrays.asList("111", "222", "333");
        DataStreamSource<String> dataStream = env.fromCollection(list);
        // 打印
        dataStream.print();
    }

    /**
     * 从文件读取数据
     */
    private static void readFromFile() {
        DataStreamSource<String> dataStreamFile = env.readTextFile("input/hello.txt");
        // 打印
        dataStreamFile.print();
    }

    /**
     * 从kafka数据源读取数据
     */
    public static void readFromKafka() {
        String topic = "sensor";
        Properties props = new Properties();
        // bootstrap.servers
        props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        // group.id
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group");
        // key.deserializer
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        // value.deserializer
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        DataStreamSource<String> dataStream = env.addSource(new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), props));
        // 打印
        dataStream.print();
    }

    /**
     * 从 自定义source 读取数据
     */
    private static void readFromCustomizeSource() {
        env.setParallelism(1);
        // 从自定义数据源读取数据
        DataStreamSource<SensorReading> dataStream = env.addSource(new MySensor());
        // 打印输出
        dataStream.print();
    }
}
